/**
 * Medium length for events
 * 
 * @author cristina
 */

package org.PP;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class MediumEvents {

	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, DoubleWritable> {
		private static Text label = new Text();
		static long anotherLabel = 0; // groups data by generating a new lable
										// each 1000 entries
		static long labels = 0; // number of keys generated

		public enum MapDuration {
			AHAM
		};

		public void map(LongWritable key, Text value,
				OutputCollector<Text, DoubleWritable> output, Reporter reporter)
				throws IOException {

			Map.label.set("medium duration " + Map.labels);

			if (Map.anotherLabel == 1000) {
				Map.anotherLabel = 0;
				Map.labels++;

				// reporter.setStatus("map " + label.toString());
				reporter.incrCounter(MapDuration.AHAM, 1);
			}

			String line = value.toString();
			// skip comments
			if (!line.startsWith("#")) {
				// collons 7 and 8 are start, end time of job respectively
				StringTokenizer tokenizer = new StringTokenizer(line);
				int count = 0;
				Double start = new Double(0), end = new Double(0), duration = new Double(
						0);
				anotherLabel++;
				while (tokenizer.hasMoreTokens()) {
					count++;
					String token = tokenizer.nextToken();
					if (count == 7)
						start = Double.parseDouble(token);
					if (count == 8) {
						end = Double.parseDouble(token);
						// stop, we have all the information
						break;
					}
				}

				duration = end - start;
				output.collect(label, new DoubleWritable(duration));
			}
		}
	}

	public static class Combiner extends MapReduceBase implements
			Reducer<Text, DoubleWritable, Text, DoubleWritable> {

		public void reduce(Text key, Iterator<DoubleWritable> values,
				OutputCollector<Text, DoubleWritable> output, Reporter reporter)
				throws IOException {
			Double sum = new Double(0);
			Long count = new Long(0);
			Double med = new Double(0);

			while (values.hasNext()) {
				count++;
				sum += values.next().get();
			}
			if (count > 0)
				med = sum / count;
			else
				med = new Double(0);
			output.collect(new Text("medium duration"), new DoubleWritable(med));

		}
	}

	public static class Reduce extends MapReduceBase implements
	Reducer<Text, DoubleWritable, Text, DoubleWritable> {

	public void reduce(Text key, Iterator<DoubleWritable> values,
			OutputCollector<Text, DoubleWritable> output, Reporter reporter)
			throws IOException {
		Double sum = new Double(0);
		Long count = new Long(0);
		Double med = new Double(0);
	
		while (values.hasNext()) {
			count++;
			sum += values.next().get();
		}
		if (count > 0)
			med = sum / count;
		else
			med = new Double(0);
		output.collect(key, new DoubleWritable(med));
	
	}
}
	
	/*
	 * public static class Reduce extends MapReduceBase implements Reducer<Text,
	 * DoubleWritable, Text, DoubleWritable> {
	 * 
	 * static Text label = new Text(); static long anotherLabel = 0; // groups
	 * data by generating a new lable each 1.000.000 entries static long labels
	 * = 0; // number of keys generated
	 * 
	 * public enum RedDuration { AHAM };
	 * 
	 * 
	 * public void reduce(Text key, Iterator<DoubleWritable> values,
	 * OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws
	 * IOException { Double sum = new Double(0); Long count = new Long(0);
	 * Double med = new Double(0);
	 * 
	 * Reduce.label.set("medium duration "+Reduce.labels);
	 * 
	 * if(Reduce.anotherLabel == 100) { Reduce.anotherLabel = 0;
	 * Reduce.labels++;
	 * 
	 * //reporter.setStatus("map " + label.toString());
	 * reporter.incrCounter(RedDuration.AHAM, 1); }
	 * 
	 * Reduce.anotherLabel++;
	 * 
	 * 
	 * while (values.hasNext()) { count++; sum += values.next().get(); }
	 * if(count > 0) med = sum/count; else med = new Double(0);
	 * output.collect(Reduce.label, new DoubleWritable(med));
	 * 
	 * } }
	 */

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(MediumEvents.class);
		conf.setJobName("mediumEvents");

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(DoubleWritable.class);

		conf.setMapperClass(Map.class);
		conf.setCombinerClass(Combiner.class);
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		conf.setNumReduceTasks(5);

		JobClient.runJob(conf);
	}
}
